package e2e

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"os"
	"strings"
	"testing"
	"time"

	"cloud.google.com/go/pubsub/v2"
	pubsubpb "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"github.com/jackc/pgx/v5"
	"github.com/stretchr/testify/require"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/durationpb"

	peer_bq "github.com/PeerDB-io/peerdb/flow/connectors/bigquery"
	connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
	"github.com/PeerDB-io/peerdb/flow/connectors/utils"
	"github.com/PeerDB-io/peerdb/flow/e2eshared"
	"github.com/PeerDB-io/peerdb/flow/generated/protos"
	"github.com/PeerDB-io/peerdb/flow/shared"
)

type PubSubSuite struct {
	t      *testing.T
	conn   *connpostgres.PostgresConnector
	suffix string
}

func (s PubSubSuite) T() *testing.T {
	return s.t
}

func (s PubSubSuite) Connector() *connpostgres.PostgresConnector {
	return s.conn
}

func (s PubSubSuite) Source() SuiteSource {
	return &PostgresSource{PostgresConnector: s.conn}
}

func (s PubSubSuite) Conn() *pgx.Conn {
	return s.Connector().Conn()
}

func (s PubSubSuite) Suffix() string {
	return s.suffix
}

func ServiceAccount() (*utils.GcpServiceAccount, error) {
	jsonPath := os.Getenv("TEST_BQ_CREDS")
	if jsonPath == "" {
		return nil, errors.New("TEST_BQ_CREDS env var not set")
	}

	content, err := e2eshared.ReadFileToBytes(jsonPath)
	if err != nil {
		return nil, fmt.Errorf("failed to read file: %w", err)
	}

	var config *protos.BigqueryConfig
	err = json.Unmarshal(content, &config)
	if err != nil {
		return nil, fmt.Errorf("failed to unmarshal json: %w", err)
	}

	return peer_bq.NewBigQueryServiceAccount(config)
}

func (s PubSubSuite) Peer(sa *utils.GcpServiceAccount) *protos.Peer {
	ret := &protos.Peer{
		Name: AddSuffix(s, "pubsub"),
		Type: protos.DBType_PUBSUB,
		Config: &protos.Peer_PubsubConfig{
			PubsubConfig: &protos.PubSubConfig{
				ServiceAccount: &protos.GcpServiceAccount{
					AuthType:                sa.Type,
					ProjectId:               sa.ProjectID,
					PrivateKeyId:            sa.PrivateKeyID,
					PrivateKey:              sa.PrivateKey,
					ClientEmail:             sa.ClientEmail,
					ClientId:                sa.ClientID,
					AuthUri:                 sa.AuthURI,
					TokenUri:                sa.TokenURI,
					AuthProviderX509CertUrl: sa.AuthProviderX509CertURL,
					ClientX509CertUrl:       sa.ClientX509CertURL,
				},
			},
		},
	}
	CreatePeer(s.t, ret)
	return ret
}

func (s PubSubSuite) DestinationTable(table string) string {
	return table
}

func (s PubSubSuite) Teardown(ctx context.Context) {
	TearDownPostgres(ctx, s)
}

func SetupPubSubSuite(t *testing.T) PubSubSuite {
	t.Helper()

	suffix := "ps_" + strings.ToLower(shared.RandomString(8))
	conn, err := SetupPostgres(t, suffix)
	require.NoError(t, err, "failed to setup postgres")

	return PubSubSuite{
		t:      t,
		conn:   conn.PostgresConnector,
		suffix: suffix,
	}
}

func Test_PubSub(t *testing.T) {
	e2eshared.RunSuite(t, SetupPubSubSuite)
}

func (s PubSubSuite) TestCreateTopic() {
	srcTableName := AttachSchema(s, "pscreate")

	_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
		CREATE TABLE IF NOT EXISTS %s (
			id SERIAL PRIMARY KEY,
			val text
		);
	`, srcTableName))
	require.NoError(s.t, err)

	sa, err := ServiceAccount()
	require.NoError(s.t, err)

	_, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
	('e2e_pscreate', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
	require.NoError(s.t, err)

	flowName := AddSuffix(s, "e2epscreate")
	connectionGen := FlowConnectionGenerationConfig{
		FlowJobName:      flowName,
		TableNameMapping: map[string]string{srcTableName: flowName},
		Destination:      s.Peer(sa).Name,
	}
	flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
	flowConnConfig.Script = "e2e_pscreate"

	tc := NewTemporalClient(s.t)
	env := ExecutePeerflow(s.t, tc, flowConnConfig)
	SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

	_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
		INSERT INTO %s (id, val) VALUES (1, 'testval')
	`, srcTableName))
	require.NoError(s.t, err)

	EnvWaitFor(s.t, env, 3*time.Minute, "create topic", func() bool {
		psclient, err := sa.CreatePubSubClient(s.t.Context())
		defer func() {
			_ = psclient.Close()
		}()
		require.NoError(s.t, err)
		topic := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
		if _, err := psclient.TopicAdminClient.GetTopic(s.t.Context(), &pubsubpb.GetTopicRequest{
			Topic: topic,
		}); err == nil {
			return true
		} else if status.Code(err) != codes.NotFound {
			require.NoError(s.t, err)
		}
		return false
	})

	env.Cancel(s.t.Context())
	RequireEnvCanceled(s.t, env)
}

func (s PubSubSuite) TestSimple() {
	srcTableName := AttachSchema(s, "pssimple")

	_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
		CREATE TABLE IF NOT EXISTS %s (
			id SERIAL PRIMARY KEY,
			val text
		);
	`, srcTableName))
	require.NoError(s.t, err)

	sa, err := ServiceAccount()
	require.NoError(s.t, err)

	_, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
	('e2e_pssimple', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
	require.NoError(s.t, err)

	flowName := AddSuffix(s, "e2epssimple")
	connectionGen := FlowConnectionGenerationConfig{
		FlowJobName:      flowName,
		TableNameMapping: map[string]string{srcTableName: flowName},
		Destination:      s.Peer(sa).Name,
	}
	flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
	flowConnConfig.Script = "e2e_pssimple"

	psclient, err := sa.CreatePubSubClient(s.t.Context())
	require.NoError(s.t, err)
	defer psclient.Close()
	topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
	_, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{
		Name: topicName,
	})
	require.NoError(s.t, err)
	sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{
		Name:                     fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName),
		Topic:                    topicName,
		MessageRetentionDuration: durationpb.New(10 * time.Minute),
		ExpirationPolicy:         &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)},
	})
	require.NoError(s.t, err)

	tc := NewTemporalClient(s.t)
	env := ExecutePeerflow(s.t, tc, flowConnConfig)
	SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

	_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
		INSERT INTO %s (id, val) VALUES (1, 'testval')
	`, srcTableName))
	require.NoError(s.t, err)

	ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute)
	defer cancel()

	msgs := make(chan *pubsub.Message)
	go func() {
		subclient := psclient.Subscriber(sub.Name)
		_ = subclient.Receive(ctx, func(_ context.Context, m *pubsub.Message) {
			msgs <- m
		})
	}()
	select {
	case msg := <-msgs:
		require.NotNil(s.t, msg)
		require.Equal(s.t, "testval", string(msg.Data))
	case <-ctx.Done():
		s.t.Log("UNEXPECTED TIMEOUT PubSub subscription waiting on message")
		s.t.Fail()
	}

	env.Cancel(s.t.Context())
	RequireEnvCanceled(s.t, env)
}

func (s PubSubSuite) TestInitialLoad() {
	srcTableName := AttachSchema(s, "psinitial")

	_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
		CREATE TABLE IF NOT EXISTS %s (
			id SERIAL PRIMARY KEY,
			val text
		);
	`, srcTableName))
	require.NoError(s.t, err)

	sa, err := ServiceAccount()
	require.NoError(s.t, err)

	_, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
	('e2e_psinitial', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
	require.NoError(s.t, err)

	flowName := AddSuffix(s, "e2epsinitial")
	connectionGen := FlowConnectionGenerationConfig{
		FlowJobName:      flowName,
		TableNameMapping: map[string]string{srcTableName: flowName},
		Destination:      s.Peer(sa).Name,
	}
	flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
	flowConnConfig.Script = "e2e_psinitial"
	flowConnConfig.DoInitialSnapshot = true

	psclient, err := sa.CreatePubSubClient(s.t.Context())
	require.NoError(s.t, err)
	defer psclient.Close()
	topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
	_, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{
		Name: topicName,
	})
	require.NoError(s.t, err)
	sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{
		Name:                     fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName),
		Topic:                    topicName,
		MessageRetentionDuration: durationpb.New(10 * time.Minute),
		ExpirationPolicy:         &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)},
	})
	require.NoError(s.t, err)
	_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(
		`INSERT INTO %s (id, val) VALUES (1, 'testval')`, srcTableName))
	require.NoError(s.t, err)

	tc := NewTemporalClient(s.t)
	env := ExecutePeerflow(s.t, tc, flowConnConfig)
	SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

	ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute)
	defer cancel()

	msgs := make(chan *pubsub.Message)
	go func() {
		subclient := psclient.Subscriber(sub.Name)
		_ = subclient.Receive(ctx, func(_ context.Context, m *pubsub.Message) {
			msgs <- m
		})
	}()
	select {
	case msg := <-msgs:
		require.NotNil(s.t, msg)
		require.Equal(s.t, "testval", string(msg.Data))
	case <-ctx.Done():
		s.t.Log("UNEXPECTED TIMEOUT PubSub subscription waiting on message")
		s.t.Fail()
	}

	env.Cancel(s.t.Context())
	RequireEnvCanceled(s.t, env)
}
